package cn.hetra.hj212.client;

import cn.hetra.hj212.core.HJ212Codec;
import cn.hetra.hj212.core.HJ212Data;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.ImmediateEventExecutor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.MonoToListenableFutureAdapter;
import org.springframework.util.concurrent.SettableListenableFuture;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.netty.Connection;
import reactor.netty.FutureMono;
import reactor.netty.NettyInbound;
import reactor.netty.NettyOutbound;
import reactor.netty.tcp.TcpClient;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

public class HJ212TcpClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(HJ212TcpClient.class);

    LoggingHandler loggingHandler = new LoggingHandler();

    private static final int PUBLISH_ON_BUFFER_SIZE = 16;

    private final TcpClient tcpClient;

    private volatile boolean stopping;

    private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler");
    private ListenableFuture<Void> handleShuttingDownConnectFailure(TcpConnectionHandler handler) {
        IllegalStateException ex = new IllegalStateException("Shutting down.");
        handler.afterConnectFailure(ex);
        return new MonoToListenableFutureAdapter<>(Mono.error(ex));
    }
    @Nullable
    private final ChannelGroup channelGroup;
    public HJ212TcpClient(String host, int port) {
        Assert.notNull(host, "host is required");
        this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
        this.tcpClient = TcpClient.create()
                .doOnChannelInit((connectionObserver, channel, remoteAddress) ->
                        channel.pipeline().addFirst(new LineBasedFrameDecoder(1024)
                                , new StringDecoder(), new StringEncoder(), loggingHandler, new HJ212Codec()))
                .host(host)
                .port(port)
                .doOnConnected(conn -> this.channelGroup.add(conn.channel()));
    }

    private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> {

        private final TcpConnectionHandler connectionHandler;

        ReactorNettyHandler(TcpConnectionHandler handler) {
            this.connectionHandler = handler;
        }
        @Override
        @SuppressWarnings("unchecked")
        public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
            inbound.withConnection(conn -> {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Connected to " + conn.address());
                }
            });
            Sinks.Empty<Void> completionSink = Sinks.empty();
            TcpConnection connection = new ReactorNettyTcpConnection(inbound, outbound, completionSink);
            scheduler.schedule(() -> this.connectionHandler.afterConnected(connection));
            inbound.receiveObject()
                    .cast(HJ212Data.class)
                    .publishOn(scheduler, PUBLISH_ON_BUFFER_SIZE)
                    .subscribe(
                            this.connectionHandler::handleMessage,
                            this.connectionHandler::handleFailure,
                            this.connectionHandler::afterConnectionClosed);

            return completionSink.asMono();
        }
    }

    public ListenableFuture<Void> connect(TcpConnectionHandler handler) {
        Assert.notNull(handler, "TcpConnectionHandler is required");
        if (this.stopping) {
            return handleShuttingDownConnectFailure(handler);
        }
        // Report first connect to the ListenableFuture
        CompletableFuture<Void> connectFuture = new CompletableFuture<>();
        tcpClient
                .handle(new ReactorNettyHandler(handler))
                .connect()
                .doOnNext(conn -> connectFuture.complete(null))
                .doOnError(connectFuture::completeExceptionally)
                .doOnError(handler::afterConnectFailure)    // report all connect failures to the handler
                .flatMap(Connection::onDispose)             // post-connect issues
                .retryWhen(Retry.from(signals -> signals
                        .map(retrySignal -> (int) retrySignal.totalRetriesInARow())
                        .flatMap(attempt -> reconnect(attempt))))
                .repeatWhen(flux -> flux
                        .scan(1, (count, element) -> count++)
                        .flatMap(attempt -> reconnect(attempt)))
                .subscribe();

        return new CompletableToListenableFutureAdapter<>(connectFuture);
    }

    private Publisher<? extends Long> reconnect(Integer attempt) {
        Long time = 1000L;
        return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty());
    }

    private Mono<Void> stopScheduler() {
        return Mono.fromRunnable(() -> {
            this.scheduler.dispose();
            for (int i = 0; i < 20; i++) {
                if (this.scheduler.isDisposed()) {
                    break;
                }
                try {
                    Thread.sleep(100);
                }
                catch (Throwable ex) {
                    break;
                }
            }
        });
    }

    public ListenableFuture<Void> shutdown() {
        if (this.stopping) {
            SettableListenableFuture<Void> future = new SettableListenableFuture<>();
            future.set(null);
            return future;
        }
        this.stopping = true;
        Mono<Void> result;
        if (this.channelGroup != null) {
            result = FutureMono.from(this.channelGroup.close());
            result = result.onErrorResume(ex -> Mono.empty()).then(stopScheduler());
        }
        else {
            result = stopScheduler();
        }
        return new MonoToListenableFutureAdapter<>(result);
    }
}
